package cn.doitedu.rtdw.data_etl;

import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/2/8
 * @Desc: 学大数据，到多易教育
 * 广告点击曝光流轻度聚合任务
 **/
public class E08_EtlJob_AdShowClickAgg {

    public static void main(String[] args) {
        // 创建编程入口环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/checkpoint");
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 创建逻辑映射表，读取kafka中的公共维度打宽日志
        tEnv.executeSql(
                " CREATE TABLE mall_events_commondim_kfksource(          "
                        + "     user_id           INT,                            "
                        + "     username          string,                         "
                        + "     event_time        bigint,                         "
                        + "     event_id          string,                         "
                        + "     device_type       string,                         "
                        + "     properties        map<string,string>,             "
                        + "     register_birthday string,                         "
                        + "     gps_province   STRING,                           "
                        + "     pt AS proctime()  ,                              "
                        + "     rt as to_timestamp_ltz(event_time,3) ,           "
                        + "     watermark for  rt as rt - interval '0' seconds   "  // 水位线watermark声明，因为后续需要进行时间窗口统计
                        + " ) WITH (                                             "
                        + "  'connector' = 'kafka',                              "
                        + "  'topic' = 'mall-events-wide',                       "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092',    "
                        + "  'properties.group.id' = 'testGroup',                "
                        + "  'scan.startup.mode' = 'latest-offset',            "
                        + "  'value.format'='json',                              "
                        + "  'value.json.fail-on-missing-field'='false',         "
                        + "  'value.fields-include' = 'EXCEPT_KEY')              ");

        // 创建逻辑映射表，用来lookup查询hbase中的广告维表
        tEnv.executeSql(
                "CREATE TABLE dim_ad_hbase( " +
                        " creative_id STRING, " +
                        " f ROW<creative_name STRING,ad_id STRING,ad_name STRING,ad_farther STRING,ad_campain STRING>, " +
                        " PRIMARY KEY (creative_id) NOT ENFORCED " +
                        ") WITH (                             " +
                        " 'connector' = 'hbase-2.2',          " +
                        " 'table-name' = 'dim_ad_info',     " +
                        " 'zookeeper.quorum' = 'doitedu:2181' " +
                        ")");

        // 创建逻辑映射表，用来插入数据到doris
        tEnv.executeSql(
                " create table adshow_agg_01_doris(   "
                        + " dt  DATE,                     "
                        + " window_start timestamp(3),    "
                        + " window_end   timestamp(3),    "
                        + " user_id   int,            "
                        + " province  varchar(20),    "
                        + " device_type varchar(20),  "
                        + " user_birthday  STRING ,   "
                        + " ad_id   varchar(10),      "
                        + " ad_creative  varchar(20), "
                        + " ad_father    varchar(20), "
                        + " ad_show_cnt  int,         "
                        + " ad_click_cnt int          "
                        + " ) WITH (                           "
                        + "    'connector' = 'doris',          "
                        + "    'fenodes' = 'doitedu:8030',     "
                        + "    'table.identifier' = 'dws.ad_ana_agg',  "
                        + "    'username' = 'root',            "
                        + "    'password' = '',                "
                        + "    'sink.label-prefix' = 'doris_tl" + System.currentTimeMillis() + "')");

        // 对日志表过滤，只留下广告曝光点击事件
        tEnv.executeSql("create temporary view ad_events AS " +
                "select * from mall_events_commondim_kfksource " +
                "where event_id in ('ad_show','ad_click')");

        // 两表join，得到广告维度打宽的数据
        // 并做聚合，并将结果插入doris
        tEnv.executeSql(
                " create temporary view joined AS                                                   "
                        + " SELECT                                                                            "
                        + "   e.user_id                                                                       "
                        + "   ,e.event_id                                                                       "
                        + "   ,e.gps_province as province                                                     "
                        + "   ,e.device_type                                                                  "
                        + "   ,e.register_birthday as user_birthday                                           "
                        + "   ,d.f.ad_id                                                                      "
                        + "   ,d.creative_id as ad_creative_id                                                "
                        + "   ,d.f.ad_farther                                                                 "
                        + "   ,e.rt                                                                           "
                        + " FROM  ad_events e                                                                 "
                        + " LEFT JOIN dim_ad_hbase                                                            "
                        + " FOR SYSTEM_TIME AS OF e.pt AS d                                                   "
                        + " ON e.properties['creative_id'] = d.creative_id                                    "
        );
        //tEnv.executeSql("select * from joined").print();

        tEnv.executeSql(
                "INSERT INTO  adshow_agg_01_doris  " +
                        "SELECT                                                                            "
                        + "   TO_DATE(date_format(window_start,'yyyy-MM-dd')) AS dt,                          "
                        + "   window_start,                                                                   "
                        + "   window_end,                                                                     "
                        + "   user_id,                                                                        "
                        + "   province,                                                                       "
                        + "   device_type,                                                                    "
                        + "   user_birthday,                                                                  "
                        + "   ad_id,                                                                          "
                        + "   ad_creative_id,                                                                 "
                        + "   ad_farther,                                                                     "
                        + "   sum(if(event_id = 'ad_show',1,0)) as ad_show_cnt,                          "
                        + "   sum(if(event_id = 'ad_click',1,0)) as ad_click_cnt                         "
                        + " FROM TABLE(                                                                       "
                        + "  TUMBLE(TABLE joined,DESCRIPTOR(rt),INTERVAL '10' MINUTES )                       "
                        + " )                                                                                 "
                        + " GROUP BY                                                                          "
                        + "   window_start,                                                                   "
                        + "   window_end,                                                                     "
                        + "   user_id,                                                                        "
                        + "   province,                                                                       "
                        + "   device_type,                                                                    "
                        + "   user_birthday,                                                                  "
                        + "   ad_id,                                                                          "
                        + "   ad_creative_id,                                                                 "
                        + "   ad_farther                                                                      "
        );
    }

}
